#!/usr/bin/env python3
#==============================================================#
# File      :   pg-role
# Desc      :   fetch pgsql instance role using different approach
# Ctime     :   2020-05-16
# Mtime     :   2025-03-17
# Path      :   /pg/bin/pg-role
# Depend    :   psql, sudo, ps
# License   :   AGPLv3 @ https://doc.pgsty.com/about/license
# Copyright :   2018-2025  Ruohang Feng / Vonng (rh@vonng.com)
#==============================================================#
import os, pwd

import os
import pwd

def get_user():
    try:
        return pwd.getpwuid(os.geteuid()).pw_name
    except KeyError:
        return ""


def role_result(pg_alive, pg_role, source):
    return {
        "alive": pg_alive,
        "role": pg_role,
        "source": source
    }

# run this as any users
def pg_role_from_ps():
    """
    Infer postgres role from process list.
    This method should always return a result (though maybe None).
    """
    processes = [i.rstrip('\n') for i in os.popen('ps h -u postgres -o command')]
    pg_alive = False
    pg_role = None
    pg_recovery = False
    pg_have_upstream = False
    pg_have_downstream = False
    for proc in processes:
        # postmaster or checkpointer as major process
        if ('post' in proc and '-D' in proc) or ('postgres:' in proc and 'checkpointer' in proc):
            pg_alive = True

        if 'postgres:' in proc:
            for pc in ['logger', 'checkpointer', 'background writer', 'stats collector', 'walwriter']:
                if pc in proc:
                    pg_alive = True
            if 'walreceiver' in proc or 'recovering' in proc:
                pg_have_upstream = True
                pg_recovery = True
            if 'walsender' in proc:
                pg_alive = True
                pg_have_downstream = True

    if pg_alive:  # this only works if pg is alive
        pg_role = 'replica' if pg_recovery else 'primary'
    return role_result(pg_alive, pg_role, "ps")


def pg_role_from_psql():
    """
    infer postgres role from psql command
    if the user running this scripts does not have nopass access to postgres
    and does not have nopass sudo privilege, this will fail

    But this result always wins precedence if success
    :return: primary | replica | None
    """
    cmd = """psql -AXtqwc 'SELECT pg_is_in_recovery()'"""
    user = get_user()
    if user != 'postgres':
        cmd = """sudo -n -iu postgres """ + cmd

    pg_alive, pg_role = None, None
    try:
        res = os.popen(cmd).read().strip()
        if res == 'f':
            pg_role, pg_alive = 'primary', True
        elif res == 't':
            pg_role, pg_alive = 'replica', True
    except:
        pass
    return role_result(pg_alive, pg_role, "psql")


# this is used when pg is not alive
def pg_role_from_pgdata(pgdata="/pg/data"):
    """
    infer postgres role from pg data dir files
    :return: primary | replica | None
    """
    pg_alive, pg_role = None, None
    try:
        flist = os.listdir(pgdata)
    except:
        return role_result(pg_alive, pg_role, "pgdata")

    # set default
    pg_alive = False
    if len(flist) > 10:  # not an empty dir
        pg_role = 'primary'

    for f in flist:
        if f == 'postmaster.pid':
            pg_alive = True  # maybe
        if f == 'standby.signal' or f == 'recovery.signal':
            pg_role = 'replica'
        if f == 'recovery.conf':
            try:
                for line in open(os.path.join(pgdata, 'recovery.conf')).readlines():
                    if line.startswith('primary_conninfo') or line.startswith('restore_command'):
                        pg_role = 'replica'
            except:
                pass

    return role_result(pg_alive, pg_role, "pgdata")


def get_pg_role():
    """
    Try multiple ways to get role:
      1) From ps
      2) If alive => cross-check by psql (wins if conflict)
      3) If not alive => from pgdata
    """
    pg_alive, pg_role, source = None, None, None
    role_ps, role_psql, role_pgdata = None, None, None

    # processes check result usually have a result as a baseline
    role_ps = pg_role_from_ps()
    pg_alive, pg_role, source = role_ps["alive"], role_ps["role"], role_ps["source"]

    # if pg is alive, double check with psql result
    if pg_alive:  # double check psql result
        role_psql = pg_role_from_psql()

        # if psql role result is not none, use it instead, otherwise use ps result
        if role_psql["role"] != None:
            pg_role, source = role_psql["role"], role_psql["source"]  # psql result always win!
            if role_psql["role"] != role_ps["role"]:
                # if role not same , use psql result, but raise a warning
                print("[WARN] psql role %s is not equal to ps role %s" % (role_psql["role"], role_ps["role"]))
        return role_result(pg_alive, pg_role, source)

    # if pg is dead , infer role from pg_data
    # this requires running as postgres or root
    role_pgdata = pg_role_from_pgdata("/pg/data")
    if role_pgdata["role"] is not None:
        pg_role, source = role_pgdata["role"], role_pgdata["source"]
        return role_result(pg_alive, pg_role, source)

    return role_result(pg_alive, pg_role, source)


def pg_role():
    try:
        res = get_pg_role()
        if res["role"] is not None:
            return res["role"]
        else:
            return "unknown"
    except:
        return "unknown"


#--------------------------------------------------------------#
#                             Main                             #
#--------------------------------------------------------------#
if __name__ == '__main__':
    print(pg_role())
